RxJs 学习

RxJs

最近在研究一下 RxJs ,这是一个非常强大的用于响应式编程的库,学习难度也比较高,想要学习的人最好要对发布订阅模式和函数式编程比较熟悉,这样学习起来就会更加的快捷顺畅。

函数式编程的教程网上有很多,推荐大家可以看这个 https://github.com/llh911001/mostly-adequate-guide-chinese, 英语能力比较强的人可以去看一下英文原版的教程。

在有了对函数式编程和发布订阅模式的了解之后,学习 RxJs 才会比较的容易。因为这里面充斥着大量的概念。

用途

要学习一个东西的时候,我最好带着目的去学习,所以首先我得明白我们为什么要花这么大的功夫去学习和使用这个库。

  • 处理比较复杂的异步逻辑的时候,它的一套规范,能让你很容易的写出高可维护,高拓展性的代码
  • 函数式编程,这是一个非常棒的编程思想,在开发大型应用到的时候能够加强代码的可维护性。
  • 处理多并发异步操作的时候,能够更加简单明了

基本概念

学习 RxJS 我们先要了解清楚其中的几个基本概念。

Observable (被观察者)

这是 RxJS 最核心的部分,一个可被订阅的对象。

Observer (观察者)

这个单词和 Observable 非常的相似,用于订阅Observable,RxJs 也提供了他的实现接口

1
2
3
4
5
6
7

interface Observer<T> {
closed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
  • closed:会在Observer 取消订阅的时候调用
  • next:是用来接收Observable发出来的消息
  • error:用来接收Observable发出的error
  • complete:当 Observable 执行complete的时候会调用

Subscription (订阅)

当 Observable 添加订阅的时候会返回一个 Subscription,主要用来取消订阅的

Operators (操作符)

操作符,使用函数式编程风格的纯函数,我们可以放心大胆的使用它而不用去担心对外部环境的影响

Subject (主体)

可以看做是一个特殊的 Observable,能够同时将信息推送给多个Observer,而Observable一个subscribe只会发送给一个Observer。

Schedulers (调度器)

调度器控制着何时启动 subscription 和何时发送通知,可以用来实现异步的通知。

一些用法

简单用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建Observables
var observable = Rx.Observable.create(function subscribe(observer) {
observer.next(‘any value’)
});

// 创建Observer
vat observer = {
next:(val) => {
console.log(val);
},
};

// 添加订阅
var subscription = observable.subscribe(observer); // 输出any value

// 取消订阅
subscription.unsubscribe();

Subject 订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

// 新建主体
var subject = new Rx.Subjecgt();

var observerA = {
next:(val) => {
console.log('this is ObserverA', val);
}
};

var ObserverB = {
next:(val) => {
console.log('this is ObserverB', val);
}
};

subject.subscribe(ObserverA);
subject.subscribe(ObserverB);

subject.next(1);
// 输出
// this is ObserverA 1
// this is ObserverB 1

多播的 Observables

Observable 只能给一个 Observer 发送消息,而多播的 Observables 可以给多个 Observer发送消息

Observables 底层本质上是用 Subject 让多个 Observer 观察到同一个 Observable 执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

var source = Rx.observable.from([1,2]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// 本质上是在 subject.subscribe();
multicasted.subscribe((val) => {
console.log('Observer A', val);
});
multicasted.subscribe((val) => {
console.log('Observer B', val);
});

// 本质上是 source.subscribe(subject);
multicasted.connect();

refCount 引用计数

有时候我们想要当第一个订阅者添加的时候自动的去 connect,在最后一个订阅者取消订阅的时候,取消连接。

我们可以使用 ConnectableObservable 的 refCount() 方法来生成Observable,这个Observable 在有第一个订阅者的时候自动的进行connect,然后在最后一个订阅者取消订阅的时候停止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

var source = Rx.Observable.interval(500); // 此方法会在给定的时间间隔发出连续的数字
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();

var subscriptionA = refCounted.subscribe({
next:(val) => {
console.log('OberverA:',val);
},
});
var subscriptionB = refCounted.subscribe({
next:(val) => {
console.log('OberverB:',val);
},
});
setTimeout(() => {
subscriptionA.unsubscribe();
}, 600);

setTimeout(() => {
// 此时 共享的Observable 将会停止,因此refCounted后面不会再有订阅者
subscriptionB.unsubscribe();
}, 1200);

BehaviourSubject

BehaviourSubject 是 Subject 的一个变体,他会将当前值传给新新订阅的订阅者。

1
2
3
4
5
6
7
8
9
10

var subject = new Rx.BehaviourSubject(0); // 给与一个初始值

var observer = {
next:(val) => {
console.log(val)
}
};

subject.subscribe(observer);// 输出 0

ReplySubject

ReplySubject 可以缓存值,你可以指定缓存多少个值发送给新的订阅者,也可以缓存多少时间内的值发送给新的订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13

var subject = new Rx.ReplySubject(2);// 缓存2个值

subject.next(0);
subject.next(1);
subject.next(2);

subject.subscribe({
next:(val) => {
console.log(val)
}
});
// 输出1,2

AsyncSubject

只有当 Observable 执行 complete 的时候,AsyncSubject 才会将最后一个值发送给订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14

var subject = new Rx.AsyncSubject();

subject.subscribe({
next:(val) => {
console.log(val);
},
});

subject.next(1);
subject.next(2);
subject.next(3);

subject.complete(); // 这时候才会输出3

Operators 操作符

RxJs 提供了很多的操作符,他们都是基于函数式编程的思想实现的。在使用操作符的时候,他们并不会修改原先的 Observable,而是返回一个新的 Observable,这是一个无副作用的操作,大大提高了我们程序的可维护性。

根据原理,我们也可以自己定义一个操作符函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

var myOperator = function (observable) {
// 这里我们只是返回一个新的 Observable ,这个Observable 会让输入的值都加1
return Rx.Observable.create(function subscribe(observer){
observable.subscribe({
next:(v) => observer.next(v + 1);
});
});
};

var source = Rx.Observable.from([1,2]);
var observable = myOperator(source);

observable.subscripe((val) => {
console.log(val);
});
// 输出2,3